#YashanDB Flink CDC Connector

YashanDB CDC连接器允许从YashanDB数据库读取快照数据和增量数据。

本文档将介绍如何配置YashanDB数据库和YashanDB CDC连接器实现对YashanDB数据库进行捕获变更事件。

# 版本配套说明

Connector Version Flink Version YashanDB Version YashanDB Jdbc Version Java Version
1.1.0 1.15 支持YashanDB YStream的YashanDB版本 支持YashanDB YStream的YashanDB Jdbc 版本 jdk11以及以上
1.2.0 1.16、1.17、1.18、1.19 支持YashanDB YStream的YashanDB版本 支持YashanDB YStream的YashanDB Jdbc 版本 jdk11以及以上

# 前提条件

  • 已安装Flink,具体操作请查阅Flink官方文档 (opens new window)

  • 已获取下列jar包并存放至<FLINK_HOME>/lib/目录下:

    可自行使用构建自动化工具(例如Maven)将项目构建打包,也可联系我们的技术支持获取。

    • flink-connector-yashandb.jar

    • YStream-vxx.xx.xx.jar

    • yashandb-jdbc.jar

# 使用限制

  • 数据类型不能为JSON,XMLTYPE,自定义数据类型,ST_GEOMETRY或BOX2D。
  • 不能为带LOB数据类型的无主键表(受Ystream相关约束,无主键表的update dml和delete dml的LOB值会丢失)。
  • 不支持增量DDL。
  • 如果使用flink stream API,每个任务的表数量不得超过一万。
  • Ystream服务最多32个,因此flink的任务最多同时只能运行32个。

# 配置YashanDB

为了让连接器能够正常运行以及监视表的相关信息变化,您需要开启YashanDB的附加日志,并为连接器的连接用户授予相应权限。

# 步骤1:配置Ystream内存池

增量数据依赖YashanDB的YStream实时获取YashanDB的已提交数据。

当您使用YashanDB作为源使用含增量同步的任务时,您需要在YashanDB中为YStream分配内存池:

ALTER SYSTEM SET STREAM_POOL_SIZE = streamPoolSize;
Copied!

# 步骤2:开启附加日志

读取增量数据变更需要开启附加日志,可按需开启库级或表级附加日志。

Caution

不开启附加日志或开启附加日志的对象不正确会导致数据丢失甚至任务失败。

当您需要监听库下全部对象时(包含新增对象),可开全库附加日志,方式如下:

ALTER DATABASE ADD SUPPLEMENTAL LOG TABLE TYPE (HEAP);
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
Copied!

当您仅需要监听某些表时,可开启表级附加日志,方式如下:

ALTER TABLE tablename ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS;
Copied!

# 步骤3:授予相关用户权限

YashanDB CDC connector的连接用户需要相关权限,请授予如下权限保障任务能够正常运行:

GRANT CREATE SESSION TO username;
GRANT SELECT ON V_$DATABASE TO username;
GRANT SELECT ON V_$TRANSACTION TO username;
GRANT SELECT ON V_$YSTREAM_SERVER TO username;
GRANT FLASHBACK ANY TABLE TO username; 
GRANT SELECT ANY TABLE TO username;
GRANT YSTREAM_CAPTURE TO username;
GRANT SELECT ON DBA_YSTREAM_PARAMETERS TO username;
Copied!

# 创建YashanDB CDC表

YashanDB CDC表可以定义如下:

-- register an YashanDB table 'products' in Flink SQL
Flink SQL> CREATE TABLE products (
     ID INT NOT NULL,
     NAME STRING,
     DESCRIPTION STRING,
     WEIGHT DECIMAL(10, 3),
     PRIMARY KEY(id) NOT ENFORCED
     ) WITH (
     'connector' = 'yashandb-cdc',
     'hostname' = 'localhost',
     'port' = '1688',
     'username' = 'flinkuser',
     'password' = 'flinkpw',
     'ystream.serverName'='server1',
     'schema-name' = 'inventory',
     'table-name' = 'products');
  
-- read snapshot and redo logs from products table
Flink SQL> SELECT * FROM products;
Copied!

# 连接器参数

参数名 是否可选 默认值 数据类型 参数描述
connector 必选 (none) String 指定需要使用的连接器,固定为yashandb-cdc
hostname 必选 (none) String YashanDB数据库服务的IP地址或hostname
port 必选 (none) Integer YashanDB数据库服务的端口
username 必选 (none) String YashanDB数据库的连接用户名
password 必选 (none) String YashanDB数据库的连接用户的登录密码
schema-name 必选 (none) String YashanDB数据库的schema名称
table-name 必选 (none) String YashanDB数据库的table名称
url 可选 (none) String YashanDB数据库的JDBC URL,如果不设置此选项,系统会根据hostname和port自动生成JDBC URL
scan.startup.mode 可选 initial String YashanDB CDC的启动模式,可选值为initiallatest-offset
scan.incremental.snapshot.chunk.size 可选 8096 Integer 表快照的块大小(行数),在读取表的快照时将根据该配置将捕获的表拆分为多个块
scan.snapshot.fetch.size 可选 1024 Integer 读取表快照时每次轮询的最大获取大小
connect.max-retries 可选 3 Integer 连接器应重试构建YashanDB数据库服务器连接的最大重试次数
connection.pool.size 可选 20 Integer 连接池大小
scan.incremental.snapshot.chunk.key-column 可选 (none) String 表快照的块键,在读取表快照时,捕获的表被块键拆分为多个块,默认情况下,块键为“ROWID”,此列必须是主键的列
ystream.serverName 必选 (none) String Ystream服务名称,要求全局唯一,连接器会根据该名称自动创建相应的Ystream服务进行增量数据读取
ystream.parallel 可选 4 Integer Redo解析线程并发数,提高解析线程并发数可以提升性能但会消耗更多资源,请合理配置该值
ystream.txnAgeSpillThreshold 可选 600 Integer LCR溢出触发的时间阈值(单位:秒)
解析过程中,若某个事务长时间不提交,等待时间超过该值时该事务所有LCR会溢出到系统表进行持久化并释放内存,此类LCR所对应的日志无需再被重复解析,用户可自行按需清理附加日志
ystream.txnLcrSpillThreshold 可选 128M String LCR溢出触发的内存占用阈值(单位:字节)
解析过程中,若某个事务所占内存超过该值,该事务所有LCR会溢出到系统表进行持久化并释放内存,此类LCR所对应的日志无需再被重复解析,用户可自行按需清理附加日志
ystream.checkpointInterval 可选 3 Integer Checkpoint执行的间隔(单位:秒)
每次Checkpoint会在系统表持久化最新的重启恢复点。客户端设置最新的applied position后,需要等待一个Checkpoint间隔才能被数据库感知
ystream.queueSize 可选 128 Integer 异步队列容纳LCR的大小,YStream客户端启用2条异步队列,以分级处理数据提高吞吐
ystream.pollTimeout 可选 10 Integer YStream客户端从队列中获取数据的最长阻塞时间(单位:秒)
ystream.clientResponseTimeout 可选 60 Integer YStream客户端等待服务端响应的最长超时时间(单位:秒)

Note:

关于YStream的使用介绍可查阅:

# 功能特性

# Exactly-Once Processing

YashanDB CDC connector首先读取快照阶段,然后再精确一致性读取变更数据事件,即使中途中发生任务失败,依赖flink的checkpoint或savepoint也会从失败点位或指定点位进行重新拉取增量任务。

# 启动读取位置

通过配置选项【scan.startup.mode】可设置连接器的启动模式:

  • initial(default): 先启动全量快照读取,再启动增量读取redo log捕获更改事件。

  • latest-offset:不启动快照读取,直接从当前日志点位增量读取redo log捕获更改事件。

# 单线程增量读取

YashanDB CDC增量源无法并行读取,因为只能一个任务可以接收更改事件。

# DataStream Source

YashanDB CDC连接器也可以是DataStream源,示例如下:

import com.sics.flink.connector.yashandb.config.StartupOptions;
import com.sics.flink.connector.yashandb.internal.options.YstreamOptions;
import com.sics.flink.connector.yashandb.source.YashanDBIncrementalSource;
import com.sics.flink.connector.yashandb.source.cdc.deserialization.JsonYstreamDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TestYashanDBCdc {
  public static void main(String[] args) throws Exception {
    ReadableConfig config = new Configuration();

    YashanDBIncrementalSource<String> source =
        YashanDBIncrementalSource.<String>newBuilder()
            .hostname("hostname")
            .port(1688)
            .fetchSize(1024)
            .schemaList("TEST")
            .startupOptions(StartupOptions.initial())
            .deserializer(new JsonYstreamDeserializationSchema())
            .username("username")
            .password("password")
            .tableList("TEST1.TAB01")
            .ystreamOptions(YstreamOptions.defaultOption) // ystream option
            .ystreamServerName("server1")
            .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromSource(source, WatermarkStrategy.noWatermarks(), "yashandb source")
        .setParallelism(4)
        .print()
        .setParallelism(1);
    env.execute("Print yashadnb Snapshot + RedoLog");
  }
}
Copied!

# 数据类型映射

YashanDB type Flink SQL type
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
NUMBER DECIMAL
BIT bytes
BOOLEAN BOOLEAN
DATE TIMESTAMP
TIME TIME
TIMESTAMP TIMESTAMP
INTERVAL YEAR TO MONTH BIGINT
INTERVAL DAY TO SECOND BIGINT
CHAR STRING
VARCHAR STRING
NCHAR STRING
NVARCHAR STRING
BLOB bytes
CLOB STRING
NCLOB STRING
RAW bytes
ROWID STRING
UROWID bytes

# 常见问题

# Q1. 为什么运行yashandb-cdc需要YStream依赖?

yashandb-cdc内部使用了YashanDB的YStream来捕获增量数据信息,yashandb-cdc依赖包不包含YStream依赖,需要用户自行手动拉取YStream依赖放入flink的lib目录下。

# Q2. yashandb-cdc报错“YashanDB YStream serverName 'server' has existed in database, Please enter a non-existent option 'ystream.serverName' in V_$YSTREAM_SERVER.”该怎么处理?

  • 首次启动yashandb-cdc,会根据用户填写的选项ystream.serverName自动创建YashanDB数据库的Ystream服务,如果数据库中已存在同名YStream服务会创建失败并报此错,需修改ystream.serverName值并确保全局唯一。

  • 非首次启动yashandb-cdc:

    • 如果使用savepoint启动,yashandb-cdc会复用上次创建的YStream服务重新从上一个数据点位拉取任务,不涉及新建YStream服务,不会出现此报错。

    • 若用户重新开启新的cdc任务,需重新配置新的唯一ystream.serverName值。

为降低重名报错的复现率,可在确认YStream服务已无需再使用后自行删除,删除语句参考如下:

EXEC DBMS_YSTREAM_ADM.STOP('ystream_server');

EXEC DBMS_YSTREAM_ADM.DROP('ystream_server');
Copied!

# Q3. 运行yashandb-cdc报错“The number of YStream server in the database has reached 32, and YashanDB's YStream server supports a maximum of 32. Please manually execute 'EXEC DBMS_YSTREAM_ADM.DROP('ystream_server')' to delete the YStream server in' select * from V_$YSTREAM_SERVER'”该如何处理?

该错误信息表示YashanDB数据库中的YStream服务数量已达最大值32个,可查看V_$YSTREAM_SERVER视图获取所有YStream服务信息,并结合实际需求手动清理未使用/无需再用的YStream服务。